/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.transforms;



import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.IterableCoder;

import com.bff.gaia.unified.sdk.coders.KvCoder;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.transforms.windowing.*;

import com.bff.gaia.unified.sdk.transforms.windowing.DefaultTrigger;

import com.bff.gaia.unified.sdk.transforms.windowing.GlobalWindows;

import com.bff.gaia.unified.sdk.transforms.windowing.InvalidWindows;

import com.bff.gaia.unified.sdk.transforms.windowing.TimestampCombiner;

import com.bff.gaia.unified.sdk.transforms.windowing.Window;

import com.bff.gaia.unified.sdk.transforms.windowing.WindowFn;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.WindowingStrategy;

import com.bff.gaia.unified.sdk.transforms.join.CoGroupByKey;

import com.bff.gaia.unified.sdk.transforms.windowing.AfterWatermark;



/**

 * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>}, groups the values by key and

 * windows, and returns a {@code PCollection<KV<K, Iterable<V>>>} representing a map from each

 * distinct key and window of the input {@code PCollection} to an {@code Iterable} over all the

 * values associated with that key in the input per window. Absent repeatedly-firing {@link

 * Window#triggering triggering}, each key in the output {@code PCollection} is unique within each

 * window.

 *

 * <p>{@code GroupByKey} is analogous to converting a multi-map into a uni-map, and related to

 * {@code GROUP BY} in SQL. It corresponds to the "shuffle" step between the Mapper and the Reducer

 * in the MapReduce framework.

 *

 * <p>Two keys of type {@code K} are compared for equality <b>not</b> by regular Java {@link

 * Object#equals}, but instead by first encoding each of the keys using the {@code Coder} of the

 * keys of the input {@code PCollection}, and then comparing the encoded bytes. This admits

 * efficient parallel evaluation. Note that this requires that the {@code Coder} of the keys be

 * deterministic (see {@link Coder#verifyDeterministic()}). If the key {@code Coder} is not

 * deterministic, an exception is thrown at pipeline construction time.

 *

 * <p>By default, the {@code Coder} of the keys of the output {@code PCollection} is the same as

 * that of the keys of the input, and the {@code Coder} of the elements of the {@code Iterable}

 * values of the output {@code PCollection} is the same as the {@code Coder} of the values of the

 * input.

 *

 * <p>Example of use:

 *

 * <pre>{@code

 * PCollection<KV<String, Doc>> urlDocPairs = ...;

 * PCollection<KV<String, Iterable<Doc>>> urlToDocs =

 *     urlDocPairs.apply(GroupByKey.<String, Doc>create());

 * PCollection<R> results =

 *     urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() }{

 *      {@code @ProcessElement

 *       public void processElement(ProcessContext c) {

 *         String url = c.element().getKey();

 *         Iterable<Doc> docsWithThatUrl = c.element().getValue();

 *         ... process all docs having that url ...

 *       }}}));

 * </pre>

 *

 * <p>{@code GroupByKey} is a key primitive in data-parallel processing, since it is the main way to

 * efficiently bring associated data together into one location. It is also a key determiner of the

 * performance of a data-parallel pipeline.

 *

 * <p>See {@link CoGroupByKey} for a way to group multiple input

 * PCollections by a common key at once.

 *

 * <p>See {@link Combine.PerKey} for a common pattern of {@code GroupByKey} followed by {@link

 * Combine.GroupedValues}.

 *

 * <p>When grouping, windows that can be merged according to the {@link WindowFn} of the input

 * {@code PCollection} will be merged together, and a window pane corresponding to the new, merged

 * window will be created. The items in this pane will be emitted when a trigger fires. By default

 * this will be when the input sources estimate there will be no more data for the window. See

 * {@link AfterWatermark} for details on the estimation.

 *

 * <p>The timestamp for each emitted pane is determined by the {@link

 * Window#withTimestampCombiner(TimestampCombiner)} windowing operation}. The output {@code

 * PCollection} will have the same {@link WindowFn} as the input.

 *

 * <p>If the input {@code PCollection} contains late data or the {@link Window#triggering requested

 * TriggerFn} can fire before the watermark, then there may be multiple elements output by a {@code

 * GroupByKey} that correspond to the same key and window.

 *

 * <p>If the {@link WindowFn} of the input requires merging, it is not valid to apply another {@code

 * GroupByKey} without first applying a new {@link WindowFn} or applying {@link Window#remerge()}.

 *

 * @param <K> the type of the keys of the input and output {@code PCollection}s

 * @param <V> the type of the values of the input {@code PCollection} and the elements of the {@code

 *     Iterable}s in the output {@code PCollection}

 */

public class GroupByKey<K, V>

    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {



  private final boolean fewKeys;



  private GroupByKey(boolean fewKeys) {

    this.fewKeys = fewKeys;

  }



  /**

   * Returns a {@code GroupByKey<K, V>} {@code PTransform}.

   *

   * @param <K> the type of the keys of the input and output {@code PCollection}s

   * @param <V> the type of the values of the input {@code PCollection} and the elements of the

   *     {@code Iterable}s in the output {@code PCollection}

   */

  public static <K, V> GroupByKey<K, V> create() {

    return new GroupByKey<>(false);

  }



  /**

   * Returns a {@code GroupByKey<K, V>} {@code PTransform} that assumes it will be grouping a small

   * number of keys.

   *

   * @param <K> the type of the keys of the input and output {@code PCollection}s

   * @param <V> the type of the values of the input {@code PCollection} and the elements of the

   *     {@code Iterable}s in the output {@code PCollection}

   */

  static <K, V> GroupByKey<K, V> createWithFewKeys() {

    return new GroupByKey<>(true);

  }



  /** Returns whether it groups just few keys. */

  public boolean fewKeys() {

    return fewKeys;

  }



  /////////////////////////////////////////////////////////////////////////////



  public static void applicableTo(PCollection<?> input) {

    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();

    // Verify that the input PCollection is bounded, or that there is windowing/triggering being

    // used. Without this, the watermark (at end of global window) will never be reached.

    if (windowingStrategy.getWindowFn() instanceof GlobalWindows

        && windowingStrategy.getTrigger() instanceof DefaultTrigger

        && input.isBounded() != PCollection.IsBounded.BOUNDED) {

      throw new IllegalStateException(

          "GroupByKey cannot be applied to non-bounded PCollection in "

              + "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "

              + "prior to GroupByKey.");

    }



    // Validate the window merge function.

    if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {

      String cause = ((InvalidWindows<?>) windowingStrategy.getWindowFn()).getCause();

      throw new IllegalStateException(

          "GroupByKey must have a valid Window merge function.  " + "Invalid because: " + cause);

    }

  }



  public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {

    WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();

    if (!inputWindowFn.isNonMerging()) {

      // Prevent merging windows again, without explicit user

      // involvement, e.g., by Window.into() or Window.remerge().

      inputWindowFn =

          new InvalidWindows<>(

              "WindowFn has already been consumed by previous GroupByKey", inputWindowFn);

    }



    // We also switch to the continuation trigger associated with the current trigger.

    return inputStrategy

        .withWindowFn(inputWindowFn)

        .withTrigger(inputStrategy.getTrigger().getContinuationTrigger());

  }



  @Override

  public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {

    applicableTo(input);



    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that

    // the key coder is deterministic.

    Coder<K> keyCoder = getKeyCoder(input.getCoder());

    try {

      keyCoder.verifyDeterministic();

    } catch (Coder.NonDeterministicException e) {

      throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e);

    }



    // This primitive operation groups by the combination of key and window,

    // merging windows as needed, using the windows assigned to the

    // key/value input elements and the window merge operation of the

    // window function associated with the input PCollection.

    return PCollection.createPrimitiveOutputInternal(

        input.getPipeline(),

        updateWindowingStrategy(input.getWindowingStrategy()),

        input.isBounded(),

        getOutputKvCoder(input.getCoder()));

  }



  /**

   * Returns the {@code Coder} of the input to this transform, which should be a {@code KvCoder}.

   */

  @SuppressWarnings("unchecked")

  static <K, V> KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {

    if (!(inputCoder instanceof KvCoder)) {

      throw new IllegalStateException("GroupByKey requires its input to use KvCoder");

    }

    return (KvCoder<K, V>) inputCoder;

  }



  /////////////////////////////////////////////////////////////////////////////



  /**

   * Returns the {@code Coder} of the keys of the input to this transform, which is also used as the

   * {@code Coder} of the keys of the output of this transform.

   */

  public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {

    return getInputKvCoder(inputCoder).getKeyCoder();

  }



  /** Returns the {@code Coder} of the values of the input to this transform. */

  public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) {

    return getInputKvCoder(inputCoder).getValueCoder();

  }



  /** Returns the {@code Coder} of the {@code Iterable} values of the output of this transform. */

  static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder) {

    return IterableCoder.of(getInputValueCoder(inputCoder));

  }



  /** Returns the {@code Coder} of the output of this transform. */

  public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {

    return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));

  }



  @Override

  public void populateDisplayData(DisplayData.Builder builder) {

    super.populateDisplayData(builder);

    if (fewKeys) {

      builder.add(DisplayData.item("fewKeys", true).withLabel("Has Few Keys"));

    }

  }

}